/*
 * Unidata Platform
 * Copyright (c) 2013-2020, UNIDATA LLC, All rights reserved.
 *
 * Commercial License
 * This version of Unidata Platform is licensed commercially and is the appropriate option for the vast majority of use cases.
 *
 * Please see the Unidata Licensing page at: https://unidata-platform.com/license/
 * For clarification or additional options, please contact: info@unidata-platform.com
 * -------
 * Disclaimer:
 * -------
 * THIS SOFTWARE IS DISTRIBUTED "AS-IS" WITHOUT ANY WARRANTIES, CONDITIONS AND
 * REPRESENTATIONS WHETHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION THE
 * IMPLIED WARRANTIES AND CONDITIONS OF MERCHANTABILITY, MERCHANTABLE QUALITY,
 * FITNESS FOR A PARTICULAR PURPOSE, DURABILITY, NON-INFRINGEMENT, PERFORMANCE AND
 * THOSE ARISING BY STATUTE OR FROM CUSTOM OR USAGE OF TRADE OR COURSE OF DEALING.
 */
package org.unidata.mdm.job.reindex.service.impl;

import java.sql.Connection;
import java.util.Objects;
import java.util.UUID;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.RowMapper;
import org.unidata.mdm.core.service.job.JobCommonParameters;
import org.unidata.mdm.core.type.keys.LSN;
import org.unidata.mdm.data.dao.StorageDAO;
import org.unidata.mdm.job.reindex.configuration.ReindexJobConfigurationConstants;
import org.unidata.mdm.system.type.runtime.MeasurementPoint;

/**
 * @author Denis Kostovarov
 */
@StepScope
public class ReindexDataJobDataItemReader extends JdbcCursorItemReader<Triple<LSN, UUID, String>> {

    private static final RowMapper<Triple<LSN, UUID, String>> NEXT_GSN_ROW_MAPPER = (rs, rowNum) -> {
        Long lsn = rs.getLong(1);
        Integer shard = rs.getInt(2);
        LSN lsnKey = LSN.of(shard, lsn);
        UUID etalonId = rs.getObject(3, UUID.class);
        String name = rs.getString(4);
        return Triple.of(lsnKey, etalonId, name);
    };
    /**
     * If true, id log should be processed instead of normal data tables.
     */
    @Value("#{jobParameters[" + ReindexJobConfigurationConstants.PARAM_PROCESS_ID_LOG + "] ?: false}")
    private boolean processIdLog;
    /**
     * Run id.
     */
    @Value("#{jobParameters[" + JobCommonParameters.PARAM_RUN_ID + "]}")
    private String runId;
    /**
     * Operation id.
     */
    @Value("#{jobParameters[" + JobCommonParameters.PARAM_OPERATION_ID + "]}")
    private String operationId;
    /**
     * Entity name.
     */
    @Value("#{stepExecutionContext[" + JobCommonParameters.PARAM_ENTITY_NAME + "]}")
    private String entityName;
    /**
     * Start of chunk to process.
     */
    @Value("#{stepExecutionContext[" + JobCommonParameters.PARAM_START_LSN + "]}")
    private Long startLsn;
    /**
     * Start of chunk to process.
     */
    @Value("#{stepExecutionContext[" + JobCommonParameters.PARAM_END_LSN + "]}")
    private Long endLsn;
    /**
     * Start of chunk to process.
     */
    @Value("#{stepExecutionContext[" + JobCommonParameters.PARAM_SHARD_NUMBER + "]}")
    private Integer shardNumber;
    /**
     * Start of chunk to process.
     */
    @Value("#{stepExecutionContext[" + JobCommonParameters.PARAM_EXECUTION_MODE + "] ?: 'DEFAULT'}")
    private ReindexDataJobExecutionMode mode;
    /**
     * Storage DAO.
     */
    @Autowired
    private StorageDAO dataStorageDAO;
    /**
     * Row mapper.
     */
    @Override
    public void openCursor(Connection con) {
        MeasurementPoint.start();
        try{
            super.openCursor(con);
        } finally {
            MeasurementPoint.stop();
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void afterPropertiesSet() throws Exception {

        super.setDataSource(processIdLog ? dataStorageDAO.getDefaultDataSource() : dataStorageDAO.shardSelect(shardNumber).dataSource());
        super.setSaveState(true);
        super.setDriverSupportsAbsolute(true);
        super.setRowMapper(NEXT_GSN_ROW_MAPPER);
        super.setSql(getQuery());
        super.afterPropertiesSet();
    }

    /**
     * @param entityName the entityName to set
     */
    public void setEntityName(String entityName) {
        this.entityName = entityName;
    }

    /**
     * @param startLsn the startLsn to set
     */
    public void setStartLsn(Long startLsn) {
        this.startLsn = startLsn;
    }

    /**
     * @param endLsn the endLsn to set
     */
    public void setEndLsn(Long endLsn) {
        this.endLsn = endLsn;
    }

    /**
     * @param unidataDataSource the unidataDataSource to set
     */
    public void setDataStorageDAO(StorageDAO dataStorageDAO) {
        this.dataStorageDAO = dataStorageDAO;
    }
    /**
     * Gets the appropriate query.
     * @return query string
     */
    private String getQuery() {

        if (processIdLog) {
            final String tableName = "reindex_id_log_" + StringUtils.replace(operationId, "-", "_");
            return "delete from " + tableName +
                   " where id >= " + startLsn +
                   " and id <= " + endLsn +
                   " returning id, etalon_id, name";
        }

        if (mode == ReindexDataJobExecutionMode.DEFAULT
         || mode == ReindexDataJobExecutionMode.IMPORT_RELATIONS_INITIAL_MULTIVERSIONS
         || mode == ReindexDataJobExecutionMode.IMPORT_RECORDS_INITIAL_MULTIVERSIONS) {
            return "select lsn, shard, id, name from record_etalons_p" + shardNumber + " where" +
                    " shard = " + shardNumber +
                    " and lsn >= " + startLsn +
                    " and lsn <= " + endLsn +
                    " and status in ('ACTIVE', 'INACTIVE')" +
                    (Objects.nonNull(entityName) ? " and name = '" + entityName + "' " : "") ;
        } else if (mode == ReindexDataJobExecutionMode.IMPORT_RECORDS_UPDATE) {
            return "select gsn, id, name from record_etalons_p" + shardNumber + " where" +
                    " shard = " + shardNumber +
                    " and lsn >= " + startLsn +
                    " and lsn <= " + endLsn +
                    " and status in ('ACTIVE', 'INACTIVE')" +
                    (Objects.nonNull(entityName) ? " and name = '" + entityName + "'" : "") +
                    " and ((record_etalons_p" + shardNumber + ".operation_id = '" + operationId +
                    "' or exists (select true from record_origins_p" + shardNumber + " o, record_vistory_p" + shardNumber + " v where o.etalon_id = etalons.id and o.id = v.origin_id and v.operation_id = '" + operationId +
                    "')) or (exists (select true from etalons_classifiers e where e.etalon_id_record = etalons.id and e.operation_id = '" +
                    operationId +
                    "') or exists (select true from etalons_classifiers e, origins_classifiers o, origins_classifiers_vistory v where e.etalon_id_record = etalons.id and o.etalon_id = e.id and o.id = v.origin_id and v.operation_id = '" +
                    operationId +
                    "')))";
        } else if (mode == ReindexDataJobExecutionMode.IMPORT_RELATIONS_UPDATE) {
            return "select gsn, id, name from etalons where " +
                    (Objects.nonNull(entityName) ? "name = '" + entityName + "' and " : "") +
                    "(lsn >= " +
                    startLsn +
                    " and lsn <= " +
                    endLsn +
                    ") and status in ('ACTIVE', 'INACTIVE')" +
                    " and (exists (select true from etalons_relations e where e.etalon_id_from = etalons.id and e.operation_id = '" +
                    operationId +
                    "') or exists (select true from etalons_relations e, origins_relations o, origins_relations_vistory v where e.etalon_id_from = etalons.id and o.etalon_id = e.id and o.id = v.origin_id and v.operation_id = '" +
                    operationId +
                    "'))";
        }

        // Provoke NPE.
        return null;
    }

    public void setShardNumber(Integer shardNumber) {
        this.shardNumber = shardNumber;
    }
}
